Fivetran Teleport Sync で RDS for SQL Server のデータを Snowflake に同期してみた #Fivetran

Fivetran Teleport Sync で RDS for SQL Server のデータを Snowflake に同期してみた #Fivetran

Clock Icon2024.03.04

Fivetran で Amazon RDS for SQL Server のデータを Fivetran Teleport Sync で Snowflake に同期してみましたので、その際の手順や挙動について記事にしました。

RDS for SQL Server との連携

Fivetran で RDS SQL Server をデータソースとする場合、以下のポイントに注意します。なお、こちらは 2024/3/4 時点の情報のためご注意ください。

前提条件

  • バージョン
    • SQL Server 2012 - 2019

接続方法

RDS SQL Server への代表的な接続方法として以下があります。

  • 直接接続
    • データベースに対して直接 Fivetran の IP CIDR を許可する
    • SSL/TLS を使用した DB インスタンスへの接続の暗号化が有効であること
  • SSH トンネルによる接続
    • インスタンスに直接接続できない場合のオプション
    • パブリックサブネットの踏み台サーバーを使用
  • AWS PrivateLink
    • インスタンスに直接接続できない場合のオプション
    • Fivetran は、ビジネスクリティカル以上のエディションであることが必要
    • サポートするリージョンにリソースがあること
      • 国内であれば、ap-northeast-1 (Tokyo)

同期方法

RDS SQL Server では、3種類の同期方法が提供されています。主な特徴は以下です。

  • Change tracking
    • SQL Server の変更の追跡(Change Tracking:CT)機能を使用
      • 同期対象の個々のテーブルでも CT を有効にする必要があります
    • 同期対象のテーブルは主キーを持つ必要がある
  • Change data capture
    • SQL Server の変更データ キャプチャ(Change data capture:CDC)機能を使用
      • 同期対象の個々のテーブルでも CDC を有効にする必要があります
    • 主キーのないテーブルでも同期可能
    • 単一の CDC キャプチャ インスタンスを持つテーブルのみを同期可能
      • 新しいテーブルまたは既存のテーブルに列を追加する場合は、それらを含む新しい CDC インスタンスを作成し、古いインスタンスがある場合、削除します
  • Fivetran Teleport Sync
    • 追加の設定は不要
    • 主キーのないテーブルでも同期可能
    • ただし、以下の主キーを持つテーブルでは同期できません
      • BIT
      • DATETIMEOFFSET
      • GEOMETRY
      • GEOGRAPHY
    • 最大 4GB のサイズの主キーを持つテーブルの同期をサポート
      • この制限を超えるテーブルは同期できません

検証環境

以下の環境を使用しています。

  • 宛先
    • Snowflake
  • データソース
    • Amazon RDS
      • SQL Server Express Edition
      • 2019 15.004345.5.v1
    • インスタンスタイプ:db.t3.micro
  • 接続方法
    • 直接接続
  • 同期方法
    • Fivetran Teleport Sync

事前準備

手元のクライアント(sqlcmd)からアクセス可能な検証用の RDS SQL Server を構築し、以下の通り Snowfalke に連携するデータを用意します。

  • データベースの作成
CREATE DATABASE testdb;
GO
  • テーブルの作成とデータの追加
    • はじめに、主キーを持つテーブルを作成します
CREATE TABLE Employees (
    EmployeeID INT PRIMARY KEY IDENTITY(1,1),
    FirstName NVARCHAR(50),
    LastName NVARCHAR(50),
    Position NVARCHAR(50)
);
GO

データを追加

INSERT INTO Employees (FirstName, LastName, Position) VALUES
('John', 'Doe', 'Software Developer'),
('Jane', 'Doe', 'Project Manager'),
('Jim', 'Beam', 'Analyst');
GO
  • Fivetran 用の読み取り専用ユーザーを作成
USE testdb;
CREATE LOGIN fivetran_user WITH PASSWORD = '<パスワード>';
CREATE USER fivetran_user FOR LOGIN fivetran_user;
  • 権限を付与
GRANT SELECT on DATABASE::testdb to fivetran_user;
GO

Fivetran の設定

Destination

データの連携先には Snowflake を使用しました。Snowflake を Destination に設定する方法は、以下の記事をご参照ください。

Connector

コネクタに RDS SQL Server を追加します。

  • 接続情報
  • 接続方法
    • ここでは「Connect directly」を指定しました。
  • 同期方法
    • Fivetran Teleport Sync を指定しました。

下図の通り、Fivetran の IP アドレスのリストが表示されるので、セキュリティグループでインバウンド通信を許可しておきます。

Fivetran IP Addresses | Fivetran

接続設定は以上になります。この状態で接続テストを行うと下図の表示となりました。

最初の証明書はルート証明機関 (CA) で、2 番目はリーフ証明書になります。いずれかを選択し、信頼できるものであることを確認することで、コネクタのセットアップを完了することができます。

What TLS Certificate Should I Trust in Fivetran’s Dashboard

再度接続テストを行います。

接続テストが完了すると、スキーマの情報が取得されます。

情報が取得されると、データベース内のスキーマやテーブルが表示されるので、同期したいテーブル(ここでは Employees テーブル)にチェックが入っている状態で [Save & Continue ] をクリックします。

どの程度まで変更をキャプチャするかの確認画面が表示されるので、任意のオプションを選択します。ここでは「Allow all」を選択し [Continue] をクリックします。

以上で設定は完了です。[Start initial Sync] をクリックし、初期同期を開始します。

  • Snowflakeで確認
    • 初期同期完了後、Snowflake 側で権限を持つユーザーでテーブルを確認します
USE SCHEMA FIVETRAN_DATABASE.SQL_SERVER_RDS_DBO;
SELECT * FROM EMPLOYEES;

下図の通り同期されていました。

主キーありテーブルにおけるスキーマの自動移行の検証

Fivetran では、対応するデータソースや同期方法など条件を満たせば、データソース側のテーブルスキーマが変更された場合でも、自動的にその変更を宛先側に反映します。

RDS SQL Server に対する Fivetran Teleport Sync 時に、どのようにこの変更が反映されるかも確認しておきます。

レコードの追加

はじめに、データソース側で以下の通り、レコードを追加してみます。

INSERT INTO Employees (FirstName, LastName, Position) VALUES ('Alice', 'Johnson', 'Software Engineer');
GO

Fivetran の同期が完了し、Snowflakeで確認すると、下図のようにレコードが追加されます。

レコードの変更

[EmployeeID] が 1 のレコードについて、Position 列を変更してみます。

UPDATE Employees SET Position = 'Senior Software Engineer' WHERE EmployeeID = 1;
GO
1> select * from employees;
2> go
EmployeeID  FirstName LastName  Position
----------- --------- --------- ----------------------
          1 John        Doe      Senior Software Engineer
          2 Jane        Doe      Project Manager
          3 Jim         Beam     Analyst
          4 Alice       Johnson  Software Engineer

(4 行処理されました)

同期完了後、Snowflakeで確認すると、変更がキャプチャされていることが確認できます。

レコードの削除

[EmployeeID] が 2 のレコードを削除してみます。

DELETE FROM Employees WHERE EmployeeID = 2;
GO
1> SELECT * FROM EMPLOYEES ORDER BY 1;
2> GO
EmployeeID  FirstName                                          LastName                                           Position
----------- -------------------------------------------------- -------------------------------------------------- --------------------------------------------------
          1 John                                               Doe                                                Senior Software Engineer
          3 Jim                                                Beam                                               Analyst
          4 Alice                                              Johnson                                            Software Engineer

(3 行処理されました)

同期完了後、Snowflake で確認すると「_FIVETRAN_DELETED」が True に変更され論理削除が実施されていることを確認できます。

列の追加

テーブル構造の変化を伴う列の追加を行います。

ALTER TABLE Employees ADD birthday DATE;
GO
1> SELECT * FROM EMPLOYEES ORDER BY 1;
2> GO
EmployeeID  FirstName                                          LastName                                           Position                                           birthday
----------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ----------------
          1 John                                               Doe                                                Senior Software Engineer                                       NULL
          3 Jim                                                Beam                                               Analyst                                                        NULL
          4 Alice                                              Johnson                                            Software Engineer                                              NULL

(3 行処理されました)

Fivetran 側を確認すると、UI でもカラムの追加を確認できます。

同期後、Snowflake でも下図の通り、列が追加されます。

レコードも追加してみます。

INSERT INTO Employees (FirstName, LastName, Position, birthday) VALUES ('Emily', 'White', 'Data Analyst', '1990-01-15');
GO
1> SELECT * FROM EMPLOYEES ORDER BY 1;
2> GO
EmployeeID  FirstName                                          LastName                                           Position                                           birthday
----------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ----------------
          1 John                                               Doe                                                Senior Software Engineer                                       NULL
          3 Jim                                                Beam                                               Analyst                                                        NULL
          4 Alice                                              Johnson                                            Software Engineer                                              NULL
          5 Emily                                              White                                              Data Analyst                                             1990-01-15

(4 行処理されました)

同期後、Snowflake でも反映されます。

列の削除

[LastName] 列を削除してみます。

ALTER TABLE Employees DROP COLUMN LastName;
GO
1> SELECT * FROM EMPLOYEES ORDER BY 1;
2> GO
EmployeeID  FirstName                                          Position                                           birthday
----------- -------------------------------------------------- -------------------------------------------------- ----------------
          1 John                                               Senior Software Engineer                                       NULL
          3 Jim                                                Analyst                                                        NULL
          4 Alice                                              Software Engineer                                              NULL
          5 Emily                                              Data Analyst                                             1990-01-15

(4 行処理されました)

Fivetran 側でスキーマを確認すると、以下のように表示されなくなります。

同期後、Snowflakeで確認すると、下図のように変更が反映されています。

ソースから削除された列は、宛先にそのまま保持される代わりに、宛先の対応する列の値には Null が書き込まれます。すでに削除済みのレコード(_FIVETRAN_DELETED が True)は変更がキャプチャされないので、そのままの値が残っています。

また、ドキュメントにもあるように、列の削除時は再同期が促されます。
※ 列追加時は再同期無しで処理可能です。
SQL Server Limitations | Fivetran

この時のログを確認すると、forced_resync_table イベントとして記録されています。

Logs - forced_resync_table | Fivetran

列の名前を変更

既存列の名称を変更してみます。

--FirstName列の名称をNameに変更
EXEC sp_rename 'Employees.FirstName', 'Name', 'COLUMN';
GO
1> SELECT * FROM EMPLOYEES ORDER BY 1;
2> GO
EmployeeID  Name                                               Position                                           birthday
----------- -------------------------------------------------- -------------------------------------------------- ----------------
          1 John                                               Senior Software Engineer                                       NULL
          3 Jim                                                Analyst                                                        NULL
          4 Alice                                              Software Engineer                                              NULL
          5 Emily                                              Data Analyst                                             1990-01-15

(4 行処理されました)

同期完了後、Snowflake で確認すると、変更後の列名を持つ新しい列が追加されています。変更前の列は、値が NULL に変更されます。

また、列名の変更時もforced_resync_table イベントとして記録されていました。

テーブルの追加

同期済みのテーブルと同じスキーマにテーブルを追加してみます。

CREATE TABLE products (
  product_id INT IDENTITY(1,1) PRIMARY KEY,
  product_name VARCHAR(255),
  price DECIMAL(10, 2),
  category VARCHAR(100)
);
INSERT INTO products (product_name, price, category) VALUES 
('Laptop', 1200.00, 'Electronics'),
('Smartphone', 800.00, 'Electronics'),
('Coffee Maker', 150.00, 'Kitchen Appliances'),
('Desk Lamp', 45.99, 'Furniture'),
('Ergonomic Chair', 249.99, 'Office Supplies');
GO
1> SELECT * FROM products ORDER BY 1;
2> GO
product_id  product_name                                                                                                                                                                                                                                                    price        category                 
----------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ------------ ----------------------------------------------------------------------------------------------------
          1 Laptop                                                                                                                                                                                                                                                               1200.00 Electronics              
          2 Smartphone                                                                                                                                                                                                                                                            800.00 Electronics              
          3 Coffee Maker                                                                                                                                                                                                                                                          150.00 Kitchen Appliances       
          4 Desk Lamp                                                                                                                                                                                                                                                              45.99 Furniture                
          5 Ergonomic Chair                                                                                                                                                                                                                                                       249.99 Office Supplies          

(5 行処理されました)

Fivetranでもすぐに変更が反映されます。

同期完了後、Snowflake 側でも自動的にテーブルの追加を確認できます。

レコードも問題なく同期されます。

テーブルの削除

さいごにテーブルを削除してみます。

--productsテーブルを削除
DROP TABLE products;
GO

Fivetran では、下図のような形で変更が反映されています。

ただし、Snowflake(Destination)側では、テーブルの削除までは反映されません。ログでは下図のようになっています。

主キーのないテーブルでのスキーマ自動移行の検証

次に、主キーのないテーブルでの挙動も見ておきます。ドキュメントでは以下に記載があります。特に、UPDATE 時の挙動が主キーありの場合と異なります。
Data pipelines for SQL Server | Warehouse configuration for Fivetran

主キーのないテーブルを作成しておきます。

CREATE TABLE example_table (
  id INT,
  name VARCHAR(255),
  description TEXT
);
INSERT INTO example_table (id, name, description) VALUES
(1, 'Item A', 'Description of Item A'),
(2, 'Item B', 'Description of Item B'),
(3, 'Item C', 'Description of Item C');
GO

同期完了後、Snowflakeで確認します。主キーのないテーブルには、「_FIVETRAN_ID」からなるシステム列が追加されます。

System columns | Fivetran

レコードの追加

主キーのないテーブルにレコードを追加します。

INSERT INTO example_table (id, name, description) VALUES
(4, 'Item D', 'Description of Item D');
GO

同期後、Snowflake で確認すると下図の通り主キーありの場合と同様の挙動となります。

レコードの変更

以下の通りレコードを変更してみます。

UPDATE example_table
SET name = 'Updated Item B', description = 'Updated Description for Item B'
WHERE id = 2;
GO

同期後、Snowflakeで確認すると下図のように、既存のレコードが論理削除され、変更後のレコードが追加される形で変更が反映されています。主キーありテーブルとは異なる挙動となります。

レコードの削除

[id] が 3 のレコードを削除してみます。

DELETE FROM example_table WHERE id = 3;
GO

同期後、Snowflakeで確認します。 [_fivetran_id] 列がある場合、この情報を使用し、主キーありテーブルと同様の形式で論理削除が行われます。

列の追加

以下の通り、列を追加します。

ALTER TABLE example_table ADD status VARCHAR(100);
GO

この場合も、主キーありの場合と同様に変更が反映されます。

列の削除

既存列([description])を削除してみます。

ALTER TABLE example_table DROP COLUMN description;
GO
1> SELECT * FROM example_table;
2> GO
id          name                                                                                                                                                                                                                                                            status                                                                                      
----------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------
          1 Item A                                                                                                                                                                                                                                                          NULL                                                                                        
          2 Updated Item B                                                                                                                                                                                                                                                  NULL                                                                                        
          4 Item D                                                                                                                                                                                                                                                          NULL                                                                                        

(3 行処理されました)

この場合、既存レコードではその値が NULL に置き換えられるので、UPDATE 時のように既存レコードは論理削除され、変更後の値(NULL)のレコードが追加される形で変更が反映され、主キーありテーブルとは異なる挙動となります。

変更後のデータは「_fivetran_deleted = FALSE」でフィルタすれば抽出できます。

列の名前を変更

列の名前を変更します。

--name列の名称をitem_nameに変更
EXEC sp_rename 'example_table.name', 'item_name', 'COLUMN';
GO

この場合、新規列が追加され、変更前列のレコード値は、NULL に置き換えられます。(既に論理削除済みのレコードは変更されません。)

主キーのないテーブルを追加

主キーのないテーブル追加時も同様の挙動です。

CREATE TABLE orders (
  order_id INT,
  customer_name VARCHAR(255),
  order_date DATE,
  order_amount DECIMAL(10, 2)
);
GO

Snowflakeで確認

レコードを追加

INSERT INTO orders (order_id, customer_name, order_date, order_amount) VALUES
(1, 'John Doe', '2022-01-01', 100.00),
(2, 'Jane Smith', '2022-01-02', 200.00),
(3, 'Emily Johnson', '2022-01-03', 150.00);
GO

Snowflake で確認

スキーマの追加

さいごにスキーマの追加を試しておきます。

CREATE SCHEMA my_new_schema;
GO

テーブルを作成

CREATE TABLE my_new_schema.my_table (
    id INT PRIMARY KEY,
    name VARCHAR(50)
);
GO

この場合、テーブルを作成するとFivetran 側でも検知され、Snowflake にも同期されていました。

Snowflake 側で確認

さいごに

Fivetran Teleport Sync で RDS SQL Server のデータを同期してみました。
SQL Server 側で追加設定は不要ですが、現時点では同期可能なテーブルサイズに制限があるため、ソース側のテーブルサイズによっては注意が必要と感じました。
他の同期方法も今後試してみたいと思います。こちらの内容が何かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.